1 Advanced Relational Operations

2 flatten Modifier

3 Nested foreach

4 More JOIN Implementations

4.1 Joining Small to Large Data

  • When joining a small file with a big file, it makes sense to send the small file to every machine (node), load it into memory, and then do the join by streaming through the large file and looking up each record in the small file. This is called a fragment-replicate join (because you fragment one file and replicate the other). It supports only inner and left outer joins.

    daily = load '/NYSE_daily.txt' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);
    divs = load '/NYSE_dividends.txt' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);
    jnd = join daily by (exchange, symbol), divs by (exchange, symbol) using 'replicated';
  • The using 'replicated' keyword tells Pig to use the fragment-replicate algorithm. The 2nd input listed in the join is always the input that is loaded into memory. If Pig cannot fit the replicated input into memory, it will issue an error and fail.

4.2 Joining Skewed Data

  • Sometimes the data you will be processing with Pig has significant skew in the number of records per key. Pig’s default join algorithm is very sensitive to skew, because it collects all of the records for a given key together on a single reducer. In some datasets, there are a few keys that have far more records than other keys. This results in a few reducers that will take much longer than the rest. To deal with this, Pig provides skew join.

    users = load 'users' as (name:chararray, city:chararray);
    cinfo = load 'cityinfo' as (city:chararray, population:int);
    jnd = join cinfo by city, users by city using 'skewed';
  • The second input in the join, in this case users, is the one that will be sampled and have its keys with a large number of values split across reducers. The first input will have records with those values replicated across reducers.

4.3 Join Sorted Data

  • If your inputs are already sorted on the join key, the join can be done efficiently in the map phase by opening both files and walking through them. Pig refers to this as a merge join. This can be done without a reduce phase, and so it is more efficient than a default join. For example:

    daily = load '/NYSE_daily.txt' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);
    srtd = order daily by symbol;
    divs = load '/NYSE_dividends.txt' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);
    dsrtd = order divs by symbol;
    jnd = join srtd by symbol, dsrtd by symbol using 'merge';
    dump jnd;
  • The result:

5 Cogroup

6 Use cogroup with foreach

7 Union

8 Cross

9 Parameter Substitution

10 Pig Macro

10.1 A Pig Macro Example

The result

10.2 Macro Modification 1: Interested in a Month of a Year

Result for 2009-02

10.3 Macro Modification 2: Another Month

Result for 2009-08